package com.apobates.forum.letterbox.impl.service;

import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.apobates.forum.letterbox.api.dao.InboxDao;
import com.apobates.forum.letterbox.api.service.InboxService;
import com.apobates.forum.letterbox.entity.ForumLetter;
import com.apobates.forum.letterbox.entity.ForumLetterStatus;
import com.apobates.forum.letterbox.entity.ForumLetterTypeEnum;
import com.apobates.forum.letterbox.entity.Inbox;
import com.apobates.forum.utils.DateTimeUtils;
import com.apobates.forum.utils.persistence.Page;
import com.apobates.forum.utils.persistence.Pageable;

@Service
public class InboxServiceImpl implements InboxService{
	@Autowired
	private InboxDao inboxDao;
	
	@Override
	public Page<ForumLetter> getInBox(long memberId, Pageable pageable) {
		Page<ForumLetter> rs = inboxDao.findAllByReceiver(memberId, pageable);
		
		final Stream<ForumLetter> result = associationLetterStatusAsync(rs.getResult().collect(Collectors.toList()));
		return new Page<ForumLetter>(){
			@Override
			public long getTotalElements() {
				return rs.getTotalElements();
			}
			@Override
			public Stream<ForumLetter> getResult() {
				return result;
			}
		};
	}

	@Override
	public Page<ForumLetter> getInBoxGroupSender(long memberId, Pageable pageable) {
		return inboxDao.findAllByReceiverGroupSender(memberId, pageable);
	}

	@Override
	public Page<ForumLetter> getInBox(long memberId, ForumLetterTypeEnum label, Pageable pageable) {
		Page<ForumLetter> rs = inboxDao.findAllByReceiverAndType(memberId, label, pageable);
		
		final Stream<ForumLetter> result = associationLetterStatusAsync(rs.getResult().collect(Collectors.toList()));
		return new Page<ForumLetter>(){
			@Override
			public long getTotalElements() {
				return rs.getTotalElements();
			}
			@Override
			public Stream<ForumLetter> getResult() {
				return result;
			}
		};
	}

	@Override
	public long countForMemberMessages(long memberId) {
		return inboxDao.countForUnReadable(memberId);
	}

	@Override
	public Map<Long, Long> groupForMemberMessages(long memberId, Set<Long> senderIdSet) {
		if(senderIdSet.isEmpty()){
			return Collections.emptyMap();
		}
		return inboxDao.groupForUnReadable(memberId, senderIdSet);
	}

	@Override
	public Stream<ForumLetter> getReadableMessages(long memberId, int size) {
		return inboxDao.findAllForUnReadable(memberId, size);
	}

	@Override
	public int remove(long memberId, List<Long> idList)throws IllegalStateException {
		if(idList==null || idList.isEmpty()){
			throw new IllegalStateException("参数不合法或不被接受");
		}
		return inboxDao.editDeleted(memberId, idList);
	}

	@Override
	public int readed(long memberId, List<Long> idList)throws IllegalStateException {
		if(idList==null || idList.isEmpty()){
			throw new IllegalStateException("参数不合法或不被接受");
		}
		return inboxDao.editReadabled(memberId, idList);
	}

	@Override
	public Optional<Boolean> readed(long memberId, long sender) {
		return inboxDao.editReadabled(memberId, sender);
	}

	@Override
	public int readed(long memberId) {
		return inboxDao.editReadabled(memberId);
	}

	@Override
	public Stream<ForumLetter> get(long sender, long memberId, int prevUnixStamp) {
		LocalDateTime startDateTime = DateTimeUtils.getDateTimeByUnixTimestamp(prevUnixStamp);
		return inboxDao.findAll(sender, memberId, startDateTime);
	}

	@Override
	public Stream<ForumLetter> get(long sender, long memberId) {
		return inboxDao.findAll(sender, memberId);
	}

	private Stream<ForumLetter> associationLetterStatusAsync(final List<ForumLetter> data) {
		if (null == data || data.isEmpty()) {
			return Stream.empty();
		}
		final Map<Long, Inbox> boxes = CompletableFuture
				.supplyAsync(() -> data.stream().map(ForumLetter::getId).collect(Collectors.toSet()))
				.thenCompose(letterIdSet -> CompletableFuture.supplyAsync(() -> inboxDao.findAllByLetter(letterIdSet)
						.collect(Collectors.toMap(Inbox::getLetter, Function.identity()))))
				.join();
		Consumer<ForumLetter> action = fl -> {
			Inbox ti = boxes.get(fl.getId());
			fl.setStatus(new ForumLetterStatus(ti.getId(), ti.isReadable(), ti.isReply(), ti.isUsable()));
		};
		return data.parallelStream().peek(action).filter(Objects::nonNull);
	}
}
